SpringBoot + Kafka 使用@KafkaListener注解批量消费 | 您所在的位置:网站首页 › springboot kafka 多个消费者 › SpringBoot + Kafka 使用@KafkaListener注解批量消费 |
使用@KafkaListener 注解进行批量消费时,出现如下报错: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecord]原因是默认没有开启批量监听的,解决办法是设置注解的 containerFactory 属性。 完整代码如下 1)、批量 消费监听工厂类 @Configuration public class KafkaConfiguration { /** * 解决批量消费的问题 * @param properties 配置信息,springboot 从配置文件获取, 自动注入 * @return 批量工厂类 */ @Bean public KafkaListenerContainerFactory batchFactory(KafkaProperties properties) { Map consumerProperties = properties.buildConsumerProperties(); ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProperties)); factory.setBatchListener(true); // 开启批量监听 return factory; } }2)、消费监听 @Component @RequiredArgsConstructor public class GxdcKafkaConsumer { private final GxdcService gxdcService; /** * 共享单车消费Listener, 批量处理加上containerFactory = "batchFactory" * @param records 消息记录对象,此处为批量消费,若单条消费,此处改为ConsumerRecord * @param consumer 消费者对象,可以获取分区、主题等信息,也可进行手动提交操作 */ @KafkaListener(topics = {"${spring.kafka.consumer.topics.ods_dc_count_result}"}, containerFactory = "batchFactory") public void listen(ConsumerRecords records, Consumer consumer) { if (records.isEmpty()) { return; } // 消息逻辑处理 for (ConsumerRecord record : records) { switch (GxdcKeyEnum.getInstance(record.key())) { case DC_POINT: BikePointInfo pointInfo = JSON.parseObject(record.value(), BikePointInfo.class); gxdcService.saveBikePointInfo(pointInfo); break; case DC_ORDER: OrderSummaryInfo summaryInfo = JSON.parseObject(record.value(), OrderSummaryInfo.class); gxdcService.saveOrderSummaryInfo(summaryInfo); break; default: } } } } |
今日新闻 |
推荐新闻 |
专题文章 |
CopyRight 2018-2019 实验室设备网 版权所有 |